// Copyright 2025 Supabase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memorytopo

import (
	"context"
	"fmt"
	"path"
	"strings"

	"github.com/multigres/multigres/go/clustermetadata/topo"
	"github.com/multigres/multigres/go/mterrors"
	"github.com/multigres/multigres/go/pb/mtrpc"
)

// Below we implement the File methods that are part of the Conn interface:
//

// Create is part of topo.Conn interface.
func (c *conn) Create(ctx context.Context, filePath string, contents []byte) (topo.Version, error) {
	// c.factory.callstats.Add([]string{"Create"}, 1)

	if err := c.dial(ctx); err != nil {
		return nil, err
	}

	if contents == nil {
		contents = []byte{}
	}

	c.factory.mu.Lock()
	defer c.factory.mu.Unlock()

	if c.factory.err != nil {
		return nil, c.factory.err
	}
	if err := c.factory.getOperationError(Create, filePath); err != nil {
		return nil, err
	}

	// Get the parent dir.
	dir, file := path.Split(filePath)
	p := c.factory.getOrCreatePath(c.cell, dir)
	if p == nil {
		return nil, mterrors.Errorf(mtrpc.Code_INVALID_ARGUMENT, "trying to create file %v in cell %v in a path that contains files", filePath, c.cell)
	}

	// Check the file doesn't already exist.
	if _, ok := p.children[file]; ok {
		return nil, topo.NewError(topo.NodeExists, file)
	}

	// Create the file.
	n := c.factory.newFile(file, contents, p)
	p.children[file] = n

	n.propagateRecursiveWatch(&topo.WatchDataRecursive{
		Path: filePath,
		WatchData: topo.WatchData{
			Contents: n.contents,
			Version:  NodeVersion(n.version),
		},
	})

	return NodeVersion(n.version), nil
}

// Update is part of topo.Conn interface.
func (c *conn) Update(ctx context.Context, filePath string, contents []byte, version topo.Version) (topo.Version, error) {
	//	c.factory.callstats.Add([]string{"Update"}, 1)

	if err := c.dial(ctx); err != nil {
		return nil, err
	}

	if contents == nil {
		contents = []byte{}
	}

	c.factory.mu.Lock()
	defer c.factory.mu.Unlock()

	if c.factory.err != nil {
		return nil, c.factory.err
	}
	if err := c.factory.getOperationError(Update, filePath); err != nil {
		return nil, err
	}

	// Get the parent dir, we'll need it in case of creation.
	dir, file := path.Split(filePath)
	p := c.factory.nodeByPath(c.cell, dir)
	if p == nil {
		// Parent doesn't exist, let's create it if we need to.
		if version != nil {
			return nil, topo.NewError(topo.NoNode, filePath)
		}
		p = c.factory.getOrCreatePath(c.cell, dir)
		if p == nil {
			return nil, mterrors.Errorf(mtrpc.Code_FAILED_PRECONDITION, "trying to create file %v in topo %v in a path that contains files", filePath, c.cell)
		}
	}

	// Get the existing file.
	n, ok := p.children[file]
	if !ok {
		// File doesn't exist, see if we need to create it.
		if version != nil {
			return nil, topo.NewError(topo.NoNode, filePath)
		}
		n = c.factory.newFile(file, contents, p)
		p.children[file] = n
		return NodeVersion(n.version), nil
	}

	// Check if it's a directory.
	if n.isDirectory() {
		return nil, mterrors.Errorf(mtrpc.Code_INVALID_ARGUMENT, "Update(%v, %v) failed: it's a directory", c.cell, filePath)
	}

	// Check the version.
	if version != nil && n.version != uint64(version.(NodeVersion)) {
		return nil, topo.NewError(topo.BadVersion, filePath)
	}

	// Now we can update.
	n.version = c.factory.getNextVersion()
	n.contents = contents

	// Call the watches
	for _, w := range n.watches {
		if w.contents != nil {
			w.contents <- &topo.WatchData{
				Contents: n.contents,
				Version:  NodeVersion(n.version),
			}
		}
	}

	n.propagateRecursiveWatch(&topo.WatchDataRecursive{
		Path: filePath,
		WatchData: topo.WatchData{
			Contents: n.contents,
			Version:  NodeVersion(n.version),
		},
	})

	return NodeVersion(n.version), nil
}

// Get is part of topo.Conn interface.
func (c *conn) Get(ctx context.Context, filePath string) ([]byte, topo.Version, error) {
	// c.factory.callstats.Add([]string{"Get"}, 1)

	if err := c.dial(ctx); err != nil {
		return nil, nil, err
	}

	c.factory.mu.Lock()
	defer c.factory.mu.Unlock()

	if c.factory.err != nil {
		return nil, nil, c.factory.err
	}
	if err := c.factory.getOperationError(Get, filePath); err != nil {
		return nil, nil, err
	}

	// Get the node.
	n := c.factory.nodeByPath(c.cell, filePath)
	// This matches the other topo implementations of returning topo.NoNode when calling
	// Get() with a key prefix or "directory".
	if n == nil || n.contents == nil {
		return nil, nil, topo.NewError(topo.NoNode, filePath)
	}
	return n.contents, NodeVersion(n.version), nil
}

// GetVersion is part of topo.Conn interface.
func (c *conn) GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error) {
	return nil, topo.NewError(topo.NoImplementation, "GetVersion not supported in memory topo")
}

// List is part of the topo.Conn interface.
func (c *conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, error) {
	// c.factory.callstats.Add([]string{"List"}, 1)

	if err := c.dial(ctx); err != nil {
		return nil, err
	}

	c.factory.mu.Lock()
	defer c.factory.mu.Unlock()

	if c.factory.err != nil {
		return nil, c.factory.err
	}
	if err := c.factory.getOperationError(List, filePathPrefix); err != nil {
		return nil, err
	}

	dir, file := path.Split(filePathPrefix)
	// Get the node to list.
	n := c.factory.nodeByPath(c.cell, dir)
	if n == nil {
		return []topo.KVInfo{}, topo.NewError(topo.NoNode, filePathPrefix)
	}

	var result []topo.KVInfo
	for name, child := range n.children {
		if !strings.HasPrefix(name, file) {
			continue
		}
		if child.isDirectory() {
			result = append(result, gatherChildren(child, path.Join(dir, name))...)
		} else {
			result = append(result, topo.KVInfo{
				Key:     []byte(path.Join(dir, name)),
				Value:   child.contents,
				Version: NodeVersion(child.version),
			})
		}
	}

	if len(result) == 0 {
		return []topo.KVInfo{}, topo.NewError(topo.NoNode, filePathPrefix)
	}

	return result, nil
}

func gatherChildren(n *node, dirPath string) []topo.KVInfo {
	var result []topo.KVInfo
	for name, child := range n.children {
		if child.isDirectory() {
			result = append(result, gatherChildren(child, path.Join(dirPath, name))...)
		} else {
			result = append(result, topo.KVInfo{
				Key:     []byte(path.Join(dirPath, name)),
				Value:   child.contents,
				Version: NodeVersion(child.version),
			})
		}
	}
	return result
}

// Delete is part of topo.Conn interface.
func (c *conn) Delete(ctx context.Context, filePath string, version topo.Version) error {
	// c.factory.callstats.Add([]string{"Delete"}, 1)

	if err := c.dial(ctx); err != nil {
		return err
	}

	c.factory.mu.Lock()
	defer c.factory.mu.Unlock()

	if c.factory.err != nil {
		return c.factory.err
	}
	if err := c.factory.getOperationError(Delete, filePath); err != nil {
		return err
	}

	// Get the parent dir.
	dir, file := path.Split(filePath)
	p := c.factory.nodeByPath(c.cell, dir)
	if p == nil {
		return topo.NewError(topo.NoNode, filePath)
	}

	// Get the existing file.
	n, ok := p.children[file]
	if !ok {
		return topo.NewError(topo.NoNode, filePath)
	}

	// Check if it's a directory.
	if n.isDirectory() {
		return fmt.Errorf("delete(%v, %v) failed: it's a directory", c.cell, filePath)
	}

	// Check the version.
	if version != nil && n.version != uint64(version.(NodeVersion)) {
		return topo.NewError(topo.BadVersion, filePath)
	}

	// Now we can delete.
	c.factory.recursiveDelete(n)

	// Call the watches
	for _, w := range n.watches {
		if w.contents != nil {
			w.contents <- &topo.WatchData{
				Err: topo.NewError(topo.NoNode, filePath),
			}
			close(w.contents)
		}
		if w.lock != nil {
			close(w.lock)
		}
	}

	n.propagateRecursiveWatch(&topo.WatchDataRecursive{
		Path: filePath,
		WatchData: topo.WatchData{
			Err: topo.NewError(topo.NoNode, filePath),
		},
	})

	return nil
}
